package drds.plus.sql_process.optimizer;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import drds.plus.common.Constants;
import drds.plus.common.jdbc.Parameters;
import drds.plus.common.jdbc.SqlTypeParser;
import drds.plus.common.lifecycle.AbstractLifecycle;
import drds.plus.common.model.RepositoryType;
import drds.plus.common.model.SqlType;
import drds.plus.common.model.hint.DirectlyRouteCondition;
import drds.plus.common.model.hint.RouteCondition;
import drds.plus.common.model.hint.RuleRouteCondition;
import drds.plus.common.properties.ConnectionProperties;
import drds.plus.common.thread_local.ThreadLocalMap;
import drds.plus.parser.abstract_syntax_tree.statement.Statement;
import drds.plus.rule_engine.rule_calculate.DataNodeDataScatterInfo;
import drds.plus.rule_engine.table_rule.TableRule;
import drds.plus.sql_process.abstract_syntax_tree.ObjectCreateFactory;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.dml.IPut;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.Function;
import drds.plus.sql_process.abstract_syntax_tree.node.dml.*;
import drds.plus.sql_process.abstract_syntax_tree.node.query.*;
import drds.plus.sql_process.abstract_syntax_tree.node.query.build.QueryBuilder;
import drds.plus.sql_process.abstract_syntax_tree.node.query.build.QueryMergeBuilder;
import drds.plus.sql_process.optimizer.chooser.*;
import drds.plus.sql_process.optimizer.execute_plan_optimizer.*;
import drds.plus.sql_process.optimizer.pre_processor.FilterPreProcessor;
import drds.plus.sql_process.optimizer.pre_processor.JoinPreProcessor;
import drds.plus.sql_process.optimizer.pre_processor.SequencePreProcessor;
import drds.plus.sql_process.optimizer.pre_processor.SubQueryPreProcessor;
import drds.plus.sql_process.optimizer.pusher.FilterPusher;
import drds.plus.sql_process.optimizer.pusher.OrderByPusher;
import drds.plus.sql_process.parser.*;
import drds.plus.sql_process.parser.visitor.SqlVisitor;
import drds.plus.sql_process.rule.RouteOptimizer;
import drds.plus.util.ExtraCmd;
import drds.tools.$;
import drds.tools.ShouldNeverHappenException;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * <pre>
 * 此优化器是根据开销进行优化的，主要优化流程如下:
 * 1. 预处理优化
 *    a.  join关系,可参见{@linkplain JoinPreProcessor}
 *    b.  filter条件,可参见{@linkplain FilterPreProcessor}
 * 2. 下推优化
 *    a.  filter下推,可参见{@linkplain FilterPusher}
 *    b.  order下推,可参见{@linkplain OrderByPusher}
 *    c.  元数据下推,可参见{@linkplain QueryBuilder}
 * 3. 索引优化
 *    a.  二级索引,可参见{@linkplain TableQuery}.convertToJoinIfNeed()
 *    b.  索引选择,可参见{@linkplain IndexChooser}
 *    c.  filter拆分,可参见{@linkplain FilterSpliter}
 * 4. join优化
 *    a.  join策略选择,可参见{@linkplain JoinChooser}
 *    b.  二级索引join优化,{@linkplain Join}.convertToJoinIfNeed()
 *    c.  join顺序调整,可参见{@linkplain JoinChooser}
 * 5. shard计算(分库分表)
 *    a.  MergeNode构造,可参见{@linkplain Router}
 *    b.  节点下推,可参见{@linkplain QueryMergeBuilder}
 * </pre>
 */
@Slf4j
public class Optimizer extends AbstractLifecycle implements IOptimizer {
    @Setter
    @Getter
    private static final String direct = "direct";
    @Setter
    @Getter
    private final List<ExecutePlanOptimizer> afterExecutePlanOptimizerList = new ArrayList<ExecutePlanOptimizer>();
    @Setter
    @Getter
    private long cacheSize = 1000;
    @Setter
    @Getter
    private long expireTime = Constants.DEFAULT_OPTIMIZER_EXPIRE_TIME;
    @Setter
    @Getter
    private SqlParseManager sqlParseManager;
    @Setter
    @Getter
    private Cache<String, OptimizeResult> optimizedResults;
    @Setter
    @Getter
    private RouteOptimizer routeOptimizer;

    public Optimizer(RouteOptimizer routeOptimizer) {
        this.routeOptimizer = routeOptimizer;
    }

    protected void doInit() {
        // after处理
        afterExecutePlanOptimizerList.add(new AvgOptimizer());
        afterExecutePlanOptimizerList.add(new ChooseThreadOptimizer());
        afterExecutePlanOptimizerList.add(new FillRequestIdAndSubRequestId());
        afterExecutePlanOptimizerList.add(new MergeJoinExpandOptimizer());
        afterExecutePlanOptimizerList.add(new MergeQueryConcurrencyWayOptimizer());
        afterExecutePlanOptimizerList.add(new StreamingOptimizer());
        afterExecutePlanOptimizerList.add(new FillLastSequenceValueOptimizer());

        if (this.sqlParseManager == null) {
            SqlParseManagerImpl sqlParseManager = new SqlParseManagerImpl();
            sqlParseManager.setCacheSize(cacheSize);
            sqlParseManager.setExpireTime(expireTime);
            this.sqlParseManager = sqlParseManager;
        }
        if (!sqlParseManager.isInited()) {
            sqlParseManager.init(); // 启动
        }
        optimizedResults = CacheBuilder.newBuilder().maximumSize(cacheSize).expireAfterWrite(expireTime, TimeUnit.MILLISECONDS).softValues().build();
    }

    protected void doDestroy() {
        optimizedResults.invalidateAll();
        sqlParseManager.destroy();
    }


    public Object optimizeHintOrNode(String sql, Parameters parameters, boolean cached, Map<String, Object> extraCmd) throws OptimizerException {
        RouteCondition routeCondition = HintParser.parse(sql);
        if (routeCondition != null && !routeCondition.getExtraCmds().isEmpty()) {
            // 合并sql中的extra cmd参数
            if (extraCmd == null) {
                extraCmd = new HashMap<String, Object>();
            }
            extraCmd.putAll(routeCondition.getExtraCmds());
        }
        if (routeCondition != null && (routeCondition instanceof DirectlyRouteCondition || routeCondition instanceof RuleRouteCondition)) {
            // 处理hint + 单记录
            String newSql = HintParser.removeHint(sql);
            return optimizerHint(newSql, cached, routeCondition, parameters, extraCmd);
        } else {
            return optimizeHintOrOptimizeNode(sql, cached, parameters, extraCmd);
        }
    }

    private ExecutePlan optimizerHint(String sql, boolean cached, RouteCondition routeCondition, Parameters parameters, Map<String, Object> extraCmd) {
        return optimizerHint(sql, cached, Arrays.asList(routeCondition), parameters, extraCmd);
    }

    private ExecutePlan optimizerHint(String sql, boolean cached, List<RouteCondition> routeConditionList, Parameters parameters, Map<String, Object> extraCmd) {
        List<ExecutePlan> executePlanList = new ArrayList<ExecutePlan>();
        String groupHint = HintParser.extractDataNodeHintString(sql);
        // 基于hint直接构造执行计划
        int index = -1; // -1代表不是batch
        if (routeConditionList.size() > 1) {
            index = 0;
        }
        for (RouteCondition routeCondition : routeConditionList) {
            if (routeCondition instanceof DirectlyRouteCondition) {
                DirectlyRouteCondition directlyRouteCondition = (DirectlyRouteCondition) routeCondition;
                if ($.isNotNullAndHasElement(directlyRouteCondition.getRealTableNamesStringSet())) {
                    ParseInfo parseInfo = sqlParseManager.parse(sql, cached);
                    Map<String, String> directRealTableNamesStringToSqlMap = buildDirectRealTableNamesStringToSqlMap(parseInfo, directlyRouteCondition.getVirtualTableNamesString(), directlyRouteCondition.getRealTableNamesStringSet(), groupHint);
                    executePlanList.addAll(buildDirectExecutePlanList(parseInfo.getSqlType(), directlyRouteCondition.getDataNodeId(), directRealTableNamesStringToSqlMap, index));
                } else {
                    // 直接下推sql时，不做任何sql解析
                    Map<String, String> directRealTableNamesStringToSqlMap = new HashMap<String, String>();
                    directRealTableNamesStringToSqlMap.put(direct, sql);
                    executePlanList.addAll(buildDirectExecutePlanList(SqlTypeParser.getSqlType(sql), directlyRouteCondition.getDataNodeId(), directRealTableNamesStringToSqlMap, index));
                }
            } else if (routeCondition instanceof RuleRouteCondition) {
                RuleRouteCondition ruleRouteCondition = (RuleRouteCondition) routeCondition;
                ParseInfo parseInfo = sqlParseManager.parse(sql, cached);
                boolean isWrite = (parseInfo.getSqlType() != SqlType.SELECT && parseInfo.getSqlType() != SqlType.SELECT_FOR_UPDATE);
                List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = OptimizerContext.getOptimizerContext().getRouteOptimizer().route(ruleRouteCondition.getVirtualTableNamesString(), ruleRouteCondition.getComparativeMapChoicer(), isWrite);
                // 考虑表名可能有重复
                Set<String> tableNameSet = new HashSet<String>();
                for (DataNodeDataScatterInfo dataNodeDataScatterInfo : dataNodeDataScatterInfoList) {
                    tableNameSet.addAll(dataNodeDataScatterInfo.getTableNameSet());
                }
                Map<String, String> directRealTableNamesStringToSqlMap = buildDirectRealTableNamesStringToSqlMap(parseInfo, ruleRouteCondition.getVirtualTableNamesString(), tableNameSet, groupHint);
                executePlanList.addAll(buildDirectExecutePlanList(parseInfo.getSqlType(), dataNodeDataScatterInfoList, directRealTableNamesStringToSqlMap, index));
            } else {
                throw new UnsupportedOperationException("RouteCondition : " + routeCondition.toString());
            }

            index++;
        }

        // 构造返回结果
        ExecutePlan returnExecutePlan = null;
        if (index > 0) {
            // 存在batch + directlyRouteCondition
            Map<List<String>, ExecutePlan> dataNodeIdAndSqlListToExecutePlanMap = new HashMap<List<String>, ExecutePlan>();
            for (ExecutePlan executePlan : executePlanList) {
                if (executePlan instanceof drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query) {
                    throw new OptimizerException("暂不支持select语句的batch处理");
                }

                List<String> dataNodeIdAndSqlList = Arrays.asList(executePlan.getDataNodeId(), executePlan.getSql());
                ExecutePlan $executePlan = dataNodeIdAndSqlListToExecutePlanMap.get(dataNodeIdAndSqlList);
                if ($executePlan == null) {
                    dataNodeIdAndSqlListToExecutePlanMap.put(dataNodeIdAndSqlList, executePlan);
                } else {
                    // 相同db + sql的节点, 合并下batchIndexs，两者肯定不会有重复
                    ((IPut) $executePlan).getBatchIndexList().addAll(((IPut) executePlan).getBatchIndexList());
                }
            }

            if (dataNodeIdAndSqlListToExecutePlanMap.size() == 1) {
                returnExecutePlan = dataNodeIdAndSqlListToExecutePlanMap.values().iterator().next();
            } else {
                drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.MergeQuery mergeQuery = ObjectCreateFactory.createMergeQuery();
                for (ExecutePlan $executePlan : dataNodeIdAndSqlListToExecutePlanMap.values()) {
                    mergeQuery.addExecutePlan($executePlan);
                }
                mergeQuery.setDataNodeId(mergeQuery.getExecutePlan().getDataNodeId()); // 选择第一个
                returnExecutePlan = mergeQuery;
            }
        } else {
            if (executePlanList.size() == 1) {
                returnExecutePlan = executePlanList.get(0);
                // 单库单表优化hint，需要特殊考虑batch
                // 单库单表只有一个hint
                if (parameters != null && parameters.isBatch()) {
                    if (returnExecutePlan instanceof drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query) {
                        throw new OptimizerException("暂不支持select语句的batch处理");
                    }

                    int batchSize = parameters.getIndexToSetParameterMethodAndArgsMapList().size();
                    List<Integer> batchIndexs = new ArrayList<Integer>();
                    for (int i = 0; i < batchSize; i++) {
                        batchIndexs.add(i);
                    }
                    ((IPut) returnExecutePlan).setBatchIndexList(batchIndexs);
                }
            } else {
                drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.MergeQuery mergeQuery = ObjectCreateFactory.createMergeQuery();
                for (ExecutePlan executePlan : executePlanList) {
                    mergeQuery.addExecutePlan(executePlan);
                }

                mergeQuery.setDataNodeId(executePlanList.get(0).getDataNodeId()); // 选择第一个
                returnExecutePlan = mergeQuery;
            }
        }

        // 进行一些自定义的额外处理
        for (ExecutePlanOptimizer executePlanOptimizer : afterExecutePlanOptimizerList) {
            returnExecutePlan = executePlanOptimizer.optimize(returnExecutePlan, parameters, extraCmd);
        }
        return returnExecutePlan;
    }

    /**
     * 通过visitor替换表名生成sql
     */
    private Map<String, String> buildDirectRealTableNamesStringToSqlMap(ParseInfo parseInfo, //
                                                                        String logicTableNamesString, //
                                                                        Set<String> realTableNamesStringSet, //
                                                                        String groupHint) {
        if (groupHint == null) {
            groupHint = "";
        }
        Map<String, String> realTableNamesStringToSqlMap = new HashMap<String, String>();
        // 指定分库分表，直接下推sql
        // 目前先考虑只有一张表的表名需要替换
        Statement statement = ((ParseInfoImpl) parseInfo).getStatement();
        if ($.isNotNullAndNotEmpty(logicTableNamesString) && $.isNotNullAndHasElement(realTableNamesStringSet)) {
            String[] logicTableNames = logicTableNamesString.split(",");
            Map<String, String> logicTableNameToRealTableNameMap = new TreeMap<String, String>();
            for (String realTableNamesString : realTableNamesStringSet) {
                String[] realTableNames = realTableNamesString.split(",");
                if (realTableNames.length != logicTableNames.length) {
                    throw new OptimizerException("hint中逻辑表和真实表数量不匹配");
                }
                int i = 0;
                for (String logicTableName : logicTableNames) {
                    logicTableNameToRealTableNameMap.put(logicTableName, realTableNames[i++]);
                }
                SqlVisitor sqlVisitor = new SqlVisitor(new StringBuilder(groupHint), logicTableNameToRealTableNameMap);
                statement.accept(sqlVisitor);
                realTableNamesStringToSqlMap.put(realTableNamesString, sqlVisitor.getSql());
            }
        } else {
            // 没有执行表设置，直接下推sql，不需要做表名替换
            realTableNamesStringToSqlMap.put(direct, parseInfo.getSql());
        }
        return realTableNamesStringToSqlMap;
    }


    /**
     * 根据规则生成对应的执行计划
     */
    private List<ExecutePlan> buildDirectExecutePlanList(SqlType sqlType, List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList, Map<String, String> directRealTableNamesStringToSqlMap, int index) {
        List<ExecutePlan> executePlanList = new ArrayList<ExecutePlan>();
        for (DataNodeDataScatterInfo dataNodeDataScatterInfo : dataNodeDataScatterInfoList) {
            for (String tableName : dataNodeDataScatterInfo.getTableNameSet()) {
                executePlanList.add(buildDirectExecutePlan(sqlType, dataNodeDataScatterInfo.getDataNodeId(), directRealTableNamesStringToSqlMap.get(tableName), index));
            }
        }
        return executePlanList;
    }

    /**
     * 根据指定的库和表生成执行计划
     */
    private List<ExecutePlan> buildDirectExecutePlanList(SqlType sqlType, String dataNodeIdsString, Map<String, String> directRealTableNamesStringToSqlMap, int index) {
        List<ExecutePlan> executePlanList = new ArrayList<ExecutePlan>();
        for (String sql : directRealTableNamesStringToSqlMap.values()) {
            String[] dataNodeIds = dataNodeIdsString.split(",");
            for (String dataNodeId : dataNodeIds) {
                executePlanList.add(buildDirectExecutePlan(sqlType, dataNodeId, sql, index));
            }
        }

        return executePlanList;
    }

    private ExecutePlan buildDirectExecutePlan(SqlType sqlType, String dataNodeId, String sql, int index) {
        ExecutePlan executePlan = null;
        switch (sqlType) {

            case INSERT:
                executePlan = ObjectCreateFactory.createInsert();
                break;
            case REPLACE:
                executePlan = ObjectCreateFactory.createReplace();
                break;
            case SELECT:
                executePlan = ObjectCreateFactory.createQueryWithIndex();
                break;
            case UPDATE:
                executePlan = ObjectCreateFactory.createUpdate();
                break;
            case DELETE:
                executePlan = ObjectCreateFactory.createDelete();
                break;
            default:
                throw new ShouldNeverHappenException();
        }

        if (executePlan != null) {
            executePlan.setSql(sql);
            executePlan.setDataNodeId(dataNodeId);
        }
        if (index > -1 && executePlan instanceof IPut) {
            // 添加batch index
            ((IPut) executePlan).getBatchIndexList().add(index);
        }

        return executePlan;
    }

    private Object optimizeHintOrOptimizeNode(final String sql, final boolean cached, final Parameters parameters, final Map<String, Object> extraCmd) {

        drds.plus.sql_process.abstract_syntax_tree.node.Node optimizedNode = null;
        if (cached && sql != null) {
            final OptimizeResult optimizeResult;
            try {
                optimizeResult = optimizedResults.get(sql, new Callable<OptimizeResult>() {

                    public OptimizeResult call() throws Exception {
                        OptimizeResult optimizeResult = new OptimizeResult();
                        try {
                            // 先解析一次结构树,判断一下是否为单库操作
                            ParseInfo parseInfo = sqlParseManager.parse(sql, false);
                            DirectlyRouteCondition directlyRouteCondition = singleDataBasePreProcess(parseInfo, extraCmd);
                            if (directlyRouteCondition != null) {
                                optimizeResult.directlyRouteCondition = directlyRouteCondition;
                            } else {
                                // 非单库进入优化逻辑
                                optimizeResult.abstractSyntaxTreeNode = optimizeNode(parseInfo.getNode(), parameters, extraCmd);
                            }
                        } catch (Exception e) {
                            optimizeResult.exception = new RuntimeException(e);
                        }
                        return optimizeResult;
                    }
                });
            } catch (ExecutionException e1) {
                throw new OptimizerException("sql_process is interrupt");
            }

            if (optimizeResult.exception != null) {
                throw optimizeResult.exception;
            } else if (optimizeResult.directlyRouteCondition != null) {
                // 如果是单库基于hint下推优化了，直接返回
                return optimizerHint(sql, cached, optimizeResult.directlyRouteCondition, parameters, extraCmd);
            } else {
                optimizedNode = optimizeResult.abstractSyntaxTreeNode.deepCopy();
                optimizedNode.build();
            }
        } else {
            ParseInfo parseInfo = sqlParseManager.parse(sql, false);
            DirectlyRouteCondition directlyRouteCondition = singleDataBasePreProcess(parseInfo, extraCmd);
            if (directlyRouteCondition != null) {
                // 直接hint下推单库
                return optimizerHint(sql, cached, directlyRouteCondition, parameters, extraCmd);
            } else {
                // 非单库进入优化逻辑
                optimizedNode = optimizeNode(parseInfo.getNode(), parameters, extraCmd);
            }
        }

        return optimizedNode;
    }

    /**
     * 基于语法树提前判断一下是否为单库上的操作,主要扫描所有的TableNode是否为单库节点(无规则配置)
     */
    private DirectlyRouteCondition singleDataBasePreProcess(ParseInfo parseInfo, Map<String, Object> extraCmd) {
        if (!parseInfo.isAbstractSyntaxTreeNode()) {
            throw new IllegalStateException("只支持dml");
        }
        drds.plus.sql_process.abstract_syntax_tree.node.Node node = parseInfo.getNode();
        if (node instanceof Dml) {
            boolean processAutoIncrement = ExtraCmd.getExtraCmdBoolean(extraCmd, ConnectionProperties.process_auto_increment_by_sequence, true);
            ((Dml) node).setProcessAutoIncrement(processAutoIncrement);
        }
        node.build();
        if (parseInfo.getSqlType() == SqlType.SELECT_LAST_INSERT_ID) {
            node.setSql(parseInfo.getSql());
            node.setDataNodeId(ExecutePlan.USE_LAST_DATA_NODE);
            return null;
        } else if (parseInfo.getSqlType() == SqlType.GET_SEQUENCE) {
            node.setDataNodeId(ExecutePlan.DUAL_GROUP);
            return null;
        }

        if (node.isExistSequenceValue() || (node instanceof Dml && ((Dml) node).processAutoIncrement())) { // 如果有seq计算，则不能做hint下推
            return null;
        }

        String lastDataNodeId = null;
        StringBuilder $virtualTableNamesString = new StringBuilder();
        StringBuilder $realTableNamesString = new StringBuilder();
        int i = 0;
        boolean existBroadCast = false;
        boolean isAllBroadcast = true;
        Set<String> tableNameSet = parseInfo.getTableNameToSchemaMap();
        for (String tableName : tableNameSet) {
            $virtualTableNamesString.append(tableName);
            if (i < tableNameSet.size() - 1) {
                $virtualTableNamesString.append(',');
            }
            TableRule tableRule = routeOptimizer.getTableRule(tableName);
            if (tableRule == null) {
                throw new NullPointerException("tableName:" + tableName);
            }
            isAllBroadcast &= tableRule.isBroadcast();
            existBroadCast |= tableRule.isBroadcast();

            if (tableRule.getRule() == null) {
                // 无对应的规则配置
                String dataNodeId = null;//tableRule.getDbNamePattern();
                if (lastDataNodeId == null || lastDataNodeId.equals(dataNodeId)) {
                    lastDataNodeId = dataNodeId;
                } else {
                    return null;
                }

                $realTableNamesString.append(tableRule.getTableNamePattern()); // 规则中定义的表名
                if (i < tableNameSet.size() - 1) {
                    $realTableNamesString.append(',');
                }
            } else {
                return null;
            }

            i++;
        }

        if (existBroadCast && isAllBroadcast) {
            return null; // 都是广播表，不走单库下推
        }

        if (lastDataNodeId != null) {
            drds.plus.common.model.DataNode dataNode = OptimizerContext.getOptimizerContext().getApplication().getDataNode(lastDataNodeId);
            if (dataNode != null && dataNode.getRepositoryType() == RepositoryType.mysql) {
                // 只支持mysql的单库下推
                String virtualTableNamesString = $virtualTableNamesString.toString();
                String realTableNamesString = $realTableNamesString.toString();
                DirectlyRouteCondition directlyRouteCondition = new DirectlyRouteCondition(lastDataNodeId);
                directlyRouteCondition.setVirtualTableNamesString(virtualTableNamesString);
                directlyRouteCondition.addRealTableNamesString(realTableNamesString);
                return directlyRouteCondition;
            }
        }

        return null;
    }

    //
    public drds.plus.sql_process.abstract_syntax_tree.node.Node optimizeNode(drds.plus.sql_process.abstract_syntax_tree.node.Node node, Parameters parameters, Map<String, Object> extraCmd) {
        // 先调用一次build，完成select字段信息的推导
        node.build();
        drds.plus.sql_process.abstract_syntax_tree.node.Node optimizedNode = node;
        try {
            if (node instanceof Query) {
                optimizedNode = this.optimizeQuery((Query) node, extraCmd);
            }

            if (node instanceof Insert) {
                optimizedNode = this.optimizeInsert((Insert) node, extraCmd);
            } else if (node instanceof Delete) {
                optimizedNode = this.optimizeDelete((Delete) node, extraCmd);
            } else if (node instanceof Update) {
                optimizedNode = this.optimizeUpdate((Update) node, extraCmd);
            } else if (node instanceof Replace) {
                optimizedNode = this.optimizeReplace((Replace) node, extraCmd);
            }

        } catch (EmptyResultFilterException e) {
            e.setNode(optimizedNode); // 设置上下文
            throw e;
        }

        return optimizedNode;
    }

    private Query optimizeQuery(Query query, Map<String, Object> extraCmd) {
        // 如果有待计算的子查询，先做子查询优化后，直接返回
        // filter中存在子查询，提前处理
        List<Function> functionList = SubQueryPreProcessor.findSubQueryFunctionList(query, true);
        for (Function function : functionList) {
            Query subQueryQuery = (Query) function.getArgList().get(0);
            subQueryQuery = this.optimizeQuery(subQueryQuery, extraCmd);
            function.getArgList().set(0, subQueryQuery);
        }

        if (functionList.size() > 0) {
            // 如果存在待处理的子查询，直接返回
            functionList = SubQueryPreProcessor.findSubQueryFunctionList(query, false);
            if (functionList.size() > 0) {
                return query;
            }
        }

        // 预先处理join
        query = JoinPreProcessor.optimize(query);

        // 预处理filter，比如过滤永假式/永真式
        query = FilterPreProcessor.optimize(query, true, extraCmd);

        // 预处理subquery filter
        query = SubQueryPreProcessor.opitmize(query);

        // 将约束条件推向叶节点
        query = FilterPusher.optimize(query);

        // 找到每一个子查询，并进行优化
        query = JoinChooser.optimize(query, extraCmd);

        // 完成之前build
        query.build();
        return query;
    }

    private drds.plus.sql_process.abstract_syntax_tree.node.Node optimizeInsert(Insert insertNode, Map<String, Object> extraCmd) {
        insertNode.setWhere((TableQuery) insertNode.getWhere().convertToJoinIfNeed());
        if (insertNode.getQuery() != null) {
            insertNode.setQuery(optimizeQuery(insertNode.getQuery(), extraCmd));
        }
        return insertNode;
    }

    private drds.plus.sql_process.abstract_syntax_tree.node.Node optimizeReplace(Replace replaceNode, Map<String, Object> extraCmd) {
        replaceNode.setWhere((TableQuery) replaceNode.getWhere().convertToJoinIfNeed());
        if (replaceNode.getQuery() != null) {
            replaceNode.setQuery(optimizeQuery(replaceNode.getQuery(), extraCmd));
        }
        return replaceNode;
    }

    private drds.plus.sql_process.abstract_syntax_tree.node.Node optimizeUpdate(Update updateNode, Map<String, Object> extraCmd) {
        updateNode.build();
        if (extraCmd == null) {
            extraCmd = new HashMap();
        }
        // update暂不允许使用索引
        extraCmd.put(ConnectionProperties.CHOOSE_INDEX, "FALSE");
        Query query = this.optimizeQuery(updateNode.getWhere(), extraCmd);
        query.build();
        updateNode.setWhere((TableQuery) query);
        return updateNode;

    }

    private drds.plus.sql_process.abstract_syntax_tree.node.Node optimizeDelete(Delete deleteNode, Map<String, Object> extraCmd) {
        Query query = this.optimizeQuery(deleteNode.getWhere(), extraCmd);
        deleteNode.setWhere((TableQuery) query);
        return deleteNode;
    }

    //
    public Function subQueryAssignValueAndReturnNextSubQueryFunction(drds.plus.sql_process.abstract_syntax_tree.node.Node node, Map<Long, Object> subQueryCorrelateFilterIdToValueMap, Map<String, Object> extraCmd) throws OptimizerException {
        if (node instanceof Query) {
            return SubQueryPreProcessor.subQueryAssignValueAndReturnNextSubQueryFunction((Query) node, subQueryCorrelateFilterIdToValueMap);
        } else if (node instanceof Dml) {
            return SubQueryPreProcessor.subQueryAssignValueAndReturnNextSubQueryFunction(((Dml) node).getWhere(), subQueryCorrelateFilterIdToValueMap);
        }
        return null;
    }


    //
    public ExecutePlan optimizeNodeAndToExecutePlanAndOptimizeExecutePlan(drds.plus.sql_process.abstract_syntax_tree.node.Node node, Parameters parameters, Map<String, Object> extraCmd) throws OptimizerException {
        try {
            // 处理batch下的sequence问题
            if (parameters != null && parameters.isBatch()) {
                SequencePreProcessor.opitmize(node, parameters, extraCmd);
            }
            // 分库，选择执行节点
            node = Router.route(node, parameters, extraCmd);
            node = this.createMergeQueryNodeForJoinQueryNode(node, extraCmd);
            if (node instanceof Query) {
                OrderByPusher.optimize((Query) node);
            }

            ExecutePlan executePlan = node.toExecutePlan();
            // 进行一些自定义的额外处理
            for (ExecutePlanOptimizer executePlanOptimizer : afterExecutePlanOptimizerList) {
                executePlan = executePlanOptimizer.optimize(executePlan, parameters, extraCmd);
            }
            return executePlan;
        } finally {
            // 清理last_sequence_id，避免干扰
            ThreadLocalMap.remove(ExecutePlan.LAST_SEQUENCE_VALUE);
        }
    }

    private drds.plus.sql_process.abstract_syntax_tree.node.Node createMergeQueryNodeForJoinQueryNode(drds.plus.sql_process.abstract_syntax_tree.node.Node node, Map<String, Object> extraCmd) {
        if (node instanceof MergeQuery) {
            for (drds.plus.sql_process.abstract_syntax_tree.node.Node node1 : ((MergeQuery) node).getNodeList()) {
                this.createMergeQueryNodeForJoinQueryNode(node1, extraCmd);
            }
        }

        if (node instanceof Join) {
            this.createMergeQueryNodeForJoinQueryNode(((Join) node).getLeftNode(), extraCmd);
            this.createMergeQueryNodeForJoinQueryNode(((Join) node).getRightNode(), extraCmd);
            // 特殊处理子查询
            if (((Join) node).getRightNode() instanceof $Query$) {
                $Query$ rightNode = ($Query$) ((Join) node).getRightNode();
                if (rightNode.getDataNodeId() != null) {
                    // right和join节点跨机，则需要右边生成Merge来做mget
                    if (!rightNode.getDataNodeId().equals(node.getDataNodeId())) {
                        MergeQuery mergeQueryNode = new MergeQuery();
                        mergeQueryNode.merge(rightNode);
                        mergeQueryNode.setSharded(false);
                        mergeQueryNode.setDataNodeId(rightNode.getDataNodeId());
                        mergeQueryNode.build();
                        ((Join) node).setRightNode(mergeQueryNode);
                    }
                }
            }
        }

        if (node instanceof $Query$) {
            if ((($Query$) node).getFirstSubNodeQueryNode() != null) {
                this.createMergeQueryNodeForJoinQueryNode((($Query$) node).getFirstSubNodeQueryNode(), extraCmd);
            }
        }

        return node;
    }

    ///////////////////////////////////


    /**
     * 基于sql进行语法树构建+优化 , cache变量可控制优化的语法树是否会被缓存
     */
    public ExecutePlan optimizeHintOrNode$optimizeNodeAndToExecutePlanAndOptimizeExecutePlanForTest(String sql, Parameters parameters, Map<String, Object> extraCmd, boolean cached) throws OptimizerException {
        Object object = optimizeHintOrNode(sql, parameters, cached, extraCmd);
        if (object instanceof ExecutePlan) {
            return (ExecutePlan) object;
        } else {
            return optimizeNodeAndToExecutePlanAndOptimizeExecutePlan((drds.plus.sql_process.abstract_syntax_tree.node.Node) object, parameters, extraCmd);
        }
    }

    public ExecutePlan optimizeNodeAndToExecutePlanAndOptimizeExecutePlanForTest(drds.plus.sql_process.abstract_syntax_tree.node.Node node, Parameters parameters, Map<String, Object> extraCmd) throws OptimizerException {
        drds.plus.sql_process.abstract_syntax_tree.node.Node optimizedNode = optimizeNode(node, parameters, extraCmd);
        return optimizeNodeAndToExecutePlanAndOptimizeExecutePlan(optimizedNode, parameters, extraCmd);
    }


    @Override
    public Logger log() {
        return log;
    }
}
